added some development tools
[windows-sources.git] / developer / Samples / NET 4.6 / Samples for Parallel / ParallelExtensionsExtras / Partitioners / ChunkPartitioner.cs
blob744f54b629039a15835f30e03072ad0719f1f393
1 //--------------------------------------------------------------------------
2 //
3 // Copyright (c) Microsoft Corporation. All rights reserved.
4 //
5 // File: ChunkPartitioner.cs
6 //
7 //--------------------------------------------------------------------------
9 using System.Collections.Generic;
10 using System.Threading;
12 namespace System.Collections.Concurrent.Partitioners
14 /// <summary>
15 /// Partitions an enumerable into chunks based on user-supplied criteria.
16 /// </summary>
17 public static class ChunkPartitioner
19 /// <summary>Creates a partitioner that chooses the next chunk size based on a user-supplied function.</summary>
20 /// <typeparam name="TSource">The type of the data being partitioned.</typeparam>
21 /// <param name="source">The data being partitioned.</param>
22 /// <param name="nextChunkSizeFunc">A function that determines the next chunk size based on the
23 /// previous chunk size.</param>
24 /// <returns>A partitioner.</returns>
25 public static OrderablePartitioner<TSource> Create<TSource>(
26 IEnumerable<TSource> source, Func<int, int> nextChunkSizeFunc)
28 return new ChunkPartitioner<TSource>(source, nextChunkSizeFunc);
31 /// <summary>Creates a partitioner that always uses a user-specified chunk size.</summary>
32 /// <typeparam name="TSource">The type of the data being partitioned.</typeparam>
33 /// <param name="source">The data being partitioned.</param>
34 /// <param name="chunkSize">The chunk size to be used.</param>
35 /// <returns>A partitioner.</returns>
36 public static OrderablePartitioner<TSource> Create<TSource>(
37 IEnumerable<TSource> source, int chunkSize)
39 return new ChunkPartitioner<TSource>(source, chunkSize);
42 /// <summary>Creates a partitioner that chooses chunk sizes between the user-specified min and max.</summary>
43 /// <typeparam name="TSource">The type of the data being partitioned.</typeparam>
44 /// <param name="source">The data being partitioned.</param>
45 /// <param name="minChunkSize">The minimum chunk size to use.</param>
46 /// <param name="maxChunkSize">The maximum chunk size to use.</param>
47 /// <returns>A partitioner.</returns>
48 public static OrderablePartitioner<TSource> Create<TSource>(
49 IEnumerable<TSource> source, int minChunkSize, int maxChunkSize)
51 return new ChunkPartitioner<TSource>(source, minChunkSize, maxChunkSize);
55 /// <summary>
56 /// Partitions an enumerable into chunks based on user-supplied criteria.
57 /// </summary>
58 internal sealed class ChunkPartitioner<T> : OrderablePartitioner<T>
60 private readonly IEnumerable<T> _source;
61 private readonly Func<int, int> _nextChunkSizeFunc;
63 public ChunkPartitioner(IEnumerable<T> source, Func<int, int> nextChunkSizeFunc)
64 // The keys will be ordered across both individual partitions and across partitions,
65 // and they will be normalized.
66 : base(true, true, true)
68 // Validate and store the enumerable and function (used to determine how big
69 // to make the next chunk given the current chunk size)
70 if (source == null) throw new ArgumentNullException("source");
71 if (nextChunkSizeFunc == null) throw new ArgumentNullException("nextChunkSizeFunc");
72 _source = source;
73 _nextChunkSizeFunc = nextChunkSizeFunc;
76 public ChunkPartitioner(IEnumerable<T> source, int chunkSize)
77 : this(source, prev => chunkSize) // uses a function that always returns the specified chunk size
79 if (chunkSize <= 0) throw new ArgumentOutOfRangeException("chunkSize");
82 public ChunkPartitioner(IEnumerable<T> source, int minChunkSize, int maxChunkSize) :
83 this(source, CreateFuncFromMinAndMax(minChunkSize, maxChunkSize)) // uses a function that grows from min to max
85 if (minChunkSize <= 0 ||
86 minChunkSize > maxChunkSize) throw new ArgumentOutOfRangeException("minChunkSize");
89 private static Func<int, int> CreateFuncFromMinAndMax(int minChunkSize, int maxChunkSize)
91 // Create a function that returns exponentially growing chunk sizes between minChunkSize and maxChunkSize
92 return delegate(int prev)
94 if (prev < minChunkSize) return minChunkSize;
95 if (prev >= maxChunkSize) return maxChunkSize;
96 int next = prev * 2;
97 if (next >= maxChunkSize || next < 0) return maxChunkSize;
98 return next;
102 /// <summary>
103 /// Partitions the underlying collection into the specified number of orderable partitions.
104 /// </summary>
105 /// <param name="partitionCount">The number of partitions to create.</param>
106 /// <returns>An object that can create partitions over the underlying data source.</returns>
107 public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int partitionCount)
109 // Validate parameters
110 if (partitionCount <= 0) throw new ArgumentOutOfRangeException("partitionCount");
112 // Create an array of dynamic partitions and return them
113 var partitions = new IEnumerator<KeyValuePair<long, T>>[partitionCount];
114 var dynamicPartitions = GetOrderableDynamicPartitions(true);
115 for (int i = 0; i < partitionCount; i++)
117 partitions[i] = dynamicPartitions.GetEnumerator(); // Create and store the next partition
119 return partitions;
122 /// <summary>Gets whether additional partitions can be created dynamically.</summary>
123 public override bool SupportsDynamicPartitions { get { return true; } }
125 /// <summary>
126 /// Creates an object that can partition the underlying collection into a variable number of
127 /// partitions.
128 /// </summary>
129 /// <returns>
130 /// An object that can create partitions over the underlying data source.
131 /// </returns>
132 public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
134 return new EnumerableOfEnumerators(this, false);
137 private IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions(bool referenceCountForDisposal)
139 return new EnumerableOfEnumerators(this, referenceCountForDisposal);
142 // The object used to dynamically create partitions
143 private class EnumerableOfEnumerators : IEnumerable<KeyValuePair<long, T>>, IDisposable
145 private readonly ChunkPartitioner<T> _parentPartitioner;
146 private readonly object _sharedLock = new object();
147 private readonly IEnumerator<T> _sharedEnumerator;
148 private long _nextSharedIndex;
149 private int _activeEnumerators;
150 private bool _noMoreElements;
151 private bool _disposed;
152 private bool _referenceCountForDisposal;
154 public EnumerableOfEnumerators(ChunkPartitioner<T> parentPartitioner, bool referenceCountForDisposal)
156 // Validate parameters
157 if (parentPartitioner == null) throw new ArgumentNullException("parentPartitioner");
159 // Store the data, including creating an enumerator from the underlying data source
160 _parentPartitioner = parentPartitioner;
161 _sharedEnumerator = parentPartitioner._source.GetEnumerator();
162 _nextSharedIndex = -1;
163 _referenceCountForDisposal = referenceCountForDisposal;
166 IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
167 public IEnumerator<KeyValuePair<long, T>> GetEnumerator()
169 if (_referenceCountForDisposal)
171 Interlocked.Increment(ref _activeEnumerators);
173 return new Enumerator(this);
176 private void DisposeEnumerator(Enumerator enumerator)
178 if (_referenceCountForDisposal)
180 if (Interlocked.Decrement(ref _activeEnumerators) == 0)
182 _sharedEnumerator.Dispose();
187 private class Enumerator : IEnumerator<KeyValuePair<long, T>>
189 private EnumerableOfEnumerators _parentEnumerable;
190 private List<KeyValuePair<long, T>> _currentChunk = new List<KeyValuePair<long, T>>();
191 private int _currentChunkCurrentIndex;
192 private int _lastRequestedChunkSize;
193 private bool _disposed;
195 public Enumerator(EnumerableOfEnumerators parentEnumerable)
197 if (parentEnumerable == null) throw new ArgumentNullException("parentEnumerable");
198 _parentEnumerable = parentEnumerable;
201 public bool MoveNext()
203 if (_disposed) throw new ObjectDisposedException(GetType().Name);
205 // Move to the next cached element. If we already retrieved a chunk and if there's still
206 // data left in it, just use the next item from it.
207 ++_currentChunkCurrentIndex;
208 if (_currentChunkCurrentIndex >= 0 &&
209 _currentChunkCurrentIndex < _currentChunk.Count) return true;
211 // First, figure out how much new data we want. The previous requested chunk size is used
212 // as input to figure out how much data the user now wants. The initial chunk size
213 // supplied is 0 so that the user delegate is made aware that this is the initial request
214 // such that it can select the initial chunk size on first request.
215 int nextChunkSize = _parentEnumerable._parentPartitioner._nextChunkSizeFunc(_lastRequestedChunkSize);
216 if (nextChunkSize <= 0) throw new InvalidOperationException(
217 "Invalid chunk size requested: chunk sizes must be positive.");
218 _lastRequestedChunkSize = nextChunkSize;
220 // Reset the list
221 _currentChunk.Clear();
222 _currentChunkCurrentIndex = 0;
223 if (nextChunkSize > _currentChunk.Capacity) _currentChunk.Capacity = nextChunkSize;
225 // Try to grab the next chunk of data
226 lock (_parentEnumerable._sharedEnumerator)
228 // If we've already discovered that no more elements exist (and we've gotten this
229 // far, which means we don't have any elements cached), we're done.
230 if (_parentEnumerable._noMoreElements) return false;
232 // Get another chunk
233 for (int i = 0; i < nextChunkSize; i++)
235 // If there are no more elements to be retrieved from the shared enumerator, mark
236 // that so that other partitions don't have to check again. Return whether we
237 // were able to retrieve any data at all.
238 if (!_parentEnumerable._sharedEnumerator.MoveNext())
240 _parentEnumerable._noMoreElements = true;
241 return _currentChunk.Count > 0;
244 ++_parentEnumerable._nextSharedIndex;
245 _currentChunk.Add(new KeyValuePair<long, T>(
246 _parentEnumerable._nextSharedIndex,
247 _parentEnumerable._sharedEnumerator.Current));
251 // We got at least some data
252 return true;
255 public KeyValuePair<long, T> Current
259 if (_currentChunkCurrentIndex >= _currentChunk.Count)
261 throw new InvalidOperationException("There is no current item.");
263 return _currentChunk[_currentChunkCurrentIndex];
267 public void Dispose()
269 if (!_disposed)
271 _parentEnumerable.DisposeEnumerator(this);
272 _disposed = true;
276 object IEnumerator.Current { get { return Current; } }
277 public void Reset() { throw new NotSupportedException(); }
280 public void Dispose()
282 if (!_disposed)
284 if (!_referenceCountForDisposal) _sharedEnumerator.Dispose();
285 _disposed = true;